SESのログをイベント種別毎に分類して保存するFirehoseをCloudFormationで作成してみた
Amazon SES のイベントデータを AWSサービスの Firehose、S3、Athena、QuickSightを利用して解析する方法がAWSブログで紹介されています。
今回、AWSブログで紹介されている CloudFormationテンプレートを改造、 Firehose の 動的パーティション機能を利用し、、ドメイン(ID)、イベントタイプ別に分類されたS3保存を実現。
Amazon SESの 一つの設定セットのイベント送信設定を実施する事で、簡単にAthenaでログ解析を可能とする利用を試す機会がありましたので、紹介させていただきます。
構成図
オリジナル
更新版
Firehose
- Firehoseに動的パーティション設定を追加しました。
- SESのJSON形式のイベントログを、JQを利用してパース。「SenderIdentity」(SES登録のドメイン)と、「EventType」(SESのイベント種別)を動的パーティション対象としました。
Type: AWS::KinesisFirehose::DeliveryStream Properties: ExtendedS3DestinationConfiguration: Prefix: "partitioned/!{partitionKeyFromQuery:SenderIdentity}/!{partitionKeyFromQuery:EventType}/!{timestamp:yyyy/MM/dd}/" ProcessingConfiguration: Enabled: true Processors: - Type: MetadataExtraction Parameters: - ParameterName: MetadataExtractionQuery ParameterValue: '{eventType:.eventType}' - ParameterName: JsonParsingEngine ParameterValue: JQ-1.6 DynamicPartitioningConfiguration: Enabled: true RetryOptions: DurationInSeconds: 300
S3
- ログ出力先のS3バケットは、ストレージ費用節約のため 40日で自動削除する ライフサイクル管理を追加しました。
s3BucketSESEvents: Type: AWS::S3::Bucket Properties: LifecycleConfiguration: Rules: - Id: !Sub 'Delete-After-40-Days' ExpirationInDays: 40 Status: Enabled
設定手順
CloudFormation
修正を反映したFirehose設置用のテンプレートをダウンロードし、任意のスタック名でデプロイします。
‐ デプロイ完了後の設置リソース
SES
Firehoseへのイベント記録を有効とした SESの設定セットを用意します。
イベントタイプ
今回、イベントタイプは「すべて」を選択
送信先
- 送信先タイプは「Firehose」を選択
- 配信ストリームは、CloudFormationで作成したFirehoseのストリームを選択
- IAMロールは、、CloudFormationで作成した、「<スタック名>-ConfigSetPermissionPutFirehose-*」 を選択。 (SESから Firehoseへの記録を許可します)
作成した設定セットは、デフォルトとして SESの設定済みドメイン(ID)に登録しました。
動作確認
SES で送信失敗、バウンスメールとして扱われたログの確認を試みました。
バウンスメールの再現
バウンスメールを発生させるため、SESのシミュレーターとして用意されているメールアドレスを利用しました。
バウンス – 受取人のEメールプロバイダーは、SMTP 550 5.1.1 レスポンスコード (「不明なユーザー」) レスポンスコードで E メールを拒否します
bounce@simulator.amazonses.com
「bounce@simulator.amazonses.com」宛にテストメールの送信を試み、バウンスイベントを発生させました。
S3
s3://aws-s3-ses-analytics-<アカウント>-<リージョン>/partitioned/<ドメイン>/
以下の「Bounce」以下、日付を含むパスで バウンスイベントのログが記録されました。
S3 Selectを利用して、バウンスイベントの詳細を確認出来ました。
Athena
パーティションなし
「/Bounce/」までのS3のパスを指定、Athenaでテーブルを定義しました。
CREATE EXTERNAL TABLE sesmaster_bounce ( eventType string, bounce struct < bouncedrecipients: array < struct < action: string, diagnosticcode: string, emailaddress: string, status: string >>, bouncesubtype: string, bouncetype: string, feedbackid: string, reportingmta: string, `timestamp`: string >, mail struct < timestamp: string, source: string, sourcearn: string, sendingaccountid: string, messageid: string, destination: string, headerstruncated: boolean, headers: array < struct < name: string, value: string >>, commonheaders: struct < `from`: array < string >, to: array < string >, messageid: string, subject: string >, tags: struct < ses_source_tls_version: string, ses_operation: string, ses_configurationset: string, ses_source_ip: string, ses_outgoing_ip: string, ses_from_domain: string, ses_caller_identity: string >> ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( "mapping.ses_caller_identity" = "ses:caller-identity", "mapping.ses_configurationset" = "ses:configuration-set", "mapping.ses_from_domain" = "ses:from-domain", "mapping.ses_operation" = "ses:opeation", "mapping.ses_outgoing_ip" = "ses:outgoing-ip", "mapping.ses_source_ip" = "ses:source-ip", "mapping.ses_source_tls_version" = "ses:source-tls-version" ) LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/partitioned/<Identity>/Bounce/'
- 実行例
- SQL
SELECT * FROM "default"."sesmaster_bounce" ;
パーティション投影有効
- パーティション投影を利用、ドメイン、イベントタイプ、日付で対象を絞り込む利用を試しました。
CREATE EXTERNAL TABLE sesmaster_partition ( eventType string, complaint struct < arrivaldate: string, complainedrecipients: array < struct < emailaddress: string >>, complaintfeedbacktype: string, feedbackid: string, `timestamp`: string, useragent: string >, bounce struct < bouncedrecipients: array < struct < action: string, diagnosticcode: string, emailaddress: string, status: string >>, bouncesubtype: string, bouncetype: string, feedbackid: string, reportingmta: string, `timestamp`: string >, mail struct < timestamp: string, source: string, sourcearn: string, sendingaccountid: string, messageid: string, destination: string, headerstruncated: boolean, headers: array < struct < name: string, value: string >>, commonheaders: struct < `from`: array < string >, to: array < string >, messageid: string, subject: string >, tags: struct < ses_source_tls_version: string, ses_operation: string, ses_configurationset: string, ses_source_ip: string, ses_outgoing_ip: string, ses_from_domain: string, ses_caller_identity: string >>, send string, delivery struct < processingtimemillis: int, recipients: array < string >, reportingmta: string, smtpresponse: string, `timestamp`: string >, open struct < ipaddress: string, `timestamp`: string, userAgent: string >, reject struct < reason: string >, click struct < ipAddress: string, `timestamp`: string, userAgent: string, link: string > ) PARTITIONED BY (sender_identity string, event_type string, day string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( "mapping.ses_caller_identity" = "ses:caller-identity", "mapping.ses_configurationset" = "ses:configuration-set", "mapping.ses_from_domain" = "ses:from-domain", "mapping.ses_operation" = "ses:opeation", "mapping.ses_outgoing_ip" = "ses:outgoing-ip", "mapping.ses_source_ip" = "ses:source-ip", "mapping.ses_source_tls_version" = "ses:source-tls-version" ) LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/partitioned/' TBLPROPERTIES ( "projection.enabled" = "true", "projection.sender_identity.type" = "injected", "projection.event_type.type" = "enum", "projection.event_type.values" = "Bounce,Complaint,Delivery,Send,Reject,Open,Click,`Rendering Failure`,DeliveryDelay,Subscription", "projection.day.type" = "date", "projection.day.format" = "yyyy/MM/dd", 'projection.day.range' = 'NOW-40DAYS,NOW', "projection.day.interval" = "1", "projection.day.interval.unit" = "DAYS", "storage.location.template" = "s3://aws-s3-ses-analytics-<aws-account-number>/partitioned/${sender_identity}/${event_type}/${day}/" )
実行例
- 指定した年月日、ドメインで発生したバウンスメールの参照を試みました
- SQL
SELECT * FROM sesmaster_partition where sender_identity ='<ドメイン>' and event_type = 'Bounce' and day ='2024/03/08'
- 当日、前日のBounceメールを対象とし、バウンス内容を展開を試みました
SELECT c.*, b.action as bounceaction, b.diagnosticcode as bouncediagnosticcode, b.emailaddress as bounceemailaddress from ( select messageid, recipient.action, recipient.diagnosticcode, recipient.emailaddress FROM ( SELECT mail.messageId as messageid, bounce.bouncedrecipients as bouncedrecipients FROM "default"."sesmaster_partition" where sender_identity = '<ドメイン>' and day in ( date_format(current_timestamp, '%Y/%m/%d'), date_format(current_timestamp - interval '1' day, '%Y/%m/%d') ) and EventType = 'Bounce' ) a CROSS JOIN UNNEST(bouncedrecipients) as t(recipient) ) b, ( SELECT eventtype as eventtype, mail.messageId as mailmessageid, mail.timestamp as mailtimestamp, mail.source as mailsource, mail.sendingAccountId as mailsendingAccountId, mail.commonHeaders.subject as mailsubject, mail.tags.ses_configurationset as mailses_configurationset, mail.tags.ses_source_ip as mailses_source_ip, mail.tags.ses_from_domain as mailses_from_domain, mail.tags.ses_outgoing_ip as mailses_outgoing_ip, bounce.bounceType as bouncebounceType, bounce.bouncesubtype as bouncebouncesubtype, bounce.feedbackid as bouncefeedbackid, bounce.timestamp as bouncetimestamp, bounce.reportingMTA as bouncereportingmta FROM "default"."sesmaster_partition" where sender_identity = '<ドメイン>' and day in ( date_format(current_timestamp, '%Y/%m/%d'), date_format(current_timestamp - interval '1' day, '%Y/%m/%d') ) and EventType = 'Bounce' ) c where b.messageid = c.mailmessageid
- 実行結果
# eventtype mailmessageid mailtimestamp mailsource mailsendingAccountId mailsubject mailses_configurationset mailses_source_ip mailses_from_domain mailses_outgoing_ip bouncebounceType bouncebouncesubtype bouncefeedbackid bouncetimestamp bouncereportingmta bounceaction bouncediagnosticcode bounceemailaddress 2 Bounce 0106018e1f0f23d1-60dd8a27-b3d5-47d3-9cd0-335ce124ebd0-000000 2024-03-08T17:14:29.713Z test-bounce-0309@<domain> 000000000000 test-bounce ["default-1"] ["162.120.155.65"] ["<domain>"] Permanent General 0106018e1f0f2751-e58a1217-7740-40b5-ac80-fdc4b3ab6fdb-000000 2024-03-08T17:14:30.701Z dns; e234-12.smtp-out.ap-northeast-1.amazonses.com failed smtp; 550 5.1.1 user unknown bounce@simulator.amazonses.com
費用
約1万通のメールをSESで送信したアカウント、Firehoseの費用で要したコストは0.016ドルでした。
約1万通を送信したSESの利用費は、約1.1ドル。Firehose 導入によるコスト増加の影響は1%台でした。
まとめ
Amazon SESから送信したメール、配送遅延や不着の調査原因を行う場合、SESのイベントログは重要な手がかりとなります。
特に不特定多数、外部メールアドレス宛に Amazon SESを利用して メールを一定以上の規模で送信する場合には、予期せぬ問題発生に備えて、今回紹介させて戴いた SESログ記録用の Firehose と、 Athenaなどの準備をお勧めします。
CloudFormationテンプレート
AWSTemplateFormatVersion: 2010-09-09 Description: 'S3 Bucket and Firehose with dynamic partition support (senders,EventType) that uses SES Config Set' Resources: s3BucketSESEvents: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 BucketName: !Join - '' - - 'aws-s3-ses-analytics-' - !Ref AWS::AccountId - '-' - !Ref AWS::Region LifecycleConfiguration: Rules: - Id: !Sub 'Delete-After-40-Days' ExpirationInDays: 40 Status: Enabled S3SESEventsBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref s3BucketSESEvents PolicyDocument: Version: 2012-10-17 Statement: - Action: - s3:GetObject - s3:ListBucket - s3:PutObject Effect: Allow Resource: - !Sub arn:aws:s3:::${s3BucketSESEvents} - !Sub arn:aws:s3:::${s3BucketSESEvents}/* Principal: AWS: !GetAtt ConfigSetPermissionPutFirehose.Arn firehoseSESRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' Policies: - PolicyName: !Join - '' - - 'aws-ses-analytics-kinesis-policy-' - !Ref AWS::StackName PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' - 'lambda:InvokeFunction' - 'lambda:GetFunctionConfiguration' Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref s3BucketSESEvents - !Join - '' - - 'arn:aws:s3:::' - !Ref s3BucketSESEvents - '*' firehoseDeliveryStream: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamEncryptionConfigurationInput: KeyType: AWS_OWNED_CMK DeliveryStreamName: !Join - '' - - 'aws-ses-analytics-kinesis-' - !Ref AWS::AccountId ExtendedS3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:aws:s3:::' - !Ref s3BucketSESEvents BufferingHints: IntervalInSeconds: 900 SizeInMBs: 128 CloudWatchLoggingOptions: Enabled: true LogGroupName: !Ref 'FirehoseCWLogGroup' LogStreamName: S3Delivery CompressionFormat: GZIP ErrorOutputPrefix: '!{firehose:error-output-type}/' Prefix: "partitioned/!{partitionKeyFromQuery:SenderIdentity}/!{partitionKeyFromQuery:EventType}/!{timestamp:yyyy/MM/dd}/" RoleARN: !GetAtt firehoseSESRole.Arn ProcessingConfiguration: Enabled: true Processors: - Type: MetadataExtraction Parameters: - ParameterName: MetadataExtractionQuery ParameterValue: '{SenderIdentity: .mail.tags."ses:sender-identity"[0], EventType:.eventType}' - ParameterName: JsonParsingEngine ParameterValue: JQ-1.6 DynamicPartitioningConfiguration: Enabled: true RetryOptions: DurationInSeconds: 300 ConfigSetPermissionPutFirehose: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: ses.amazonaws.com Action: sts:AssumeRole Path: '/service-role/' Policies: - PolicyName: !Join - '' - - 'AWS-ses-config-set-destination-' - !Ref AWS::StackName PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - firehose:PutRecord - firehose:PutRecordBatch Resource: - !Join - '' - - 'arn:aws:firehose:' - !Ref AWS::Region - ':' - !Ref AWS::AccountId - ':deliverystream/' - !Join - '' - - 'aws-ses-analytics-kinesis-' - !Ref AWS::AccountId - Effect: Allow Action: - logs:PutLogEvents Resource: !Sub '${FirehoseCWLogGroup.Arn}' FirehoseCWLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub '/aws/firehose/aws-ses-analytics-kinesis-${AWS::AccountId}' RetentionInDays: 7 FirehoseCWLogStream: Type: AWS::Logs::LogStream Properties: LogGroupName: !Ref 'FirehoseCWLogGroup' LogStreamName: S3Delivery